FastAPI এবং Celery এর Integration (Task Queue)

Web Development - ফাস্টএপিআই (FastAPI)
296

FastAPI এবং Celery ব্যবহার করে Task Queue পরিচালনা করা একটি শক্তিশালী উপায় যা asynchronous task processing নিশ্চিত করে। এটি দীর্ঘ-running টাস্কের জন্য খুবই কার্যকর, যেমন ইমেইল পাঠানো, ভিডিও প্রসেসিং, ডেটা বিশ্লেষণ, বা কোনো ব্যাকগ্রাউন্ড প্রসেস। Celery একটি জনপ্রিয় Python টুল যা task queue এবং distributed task processing এ ব্যবহৃত হয়।

এখানে আমরা দেখব FastAPI এবং Celery কীভাবে একসাথে কাজ করতে পারে।


Step 1: প্রয়োজনীয় প্যাকেজ ইনস্টল করা

প্রথমে, FastAPI এবং Celery এর জন্য প্রয়োজনীয় প্যাকেজ ইনস্টল করুন। Celery এবং Redis (যেহেতু Redis একটি জনপ্রিয় ব্রোকার হিসেবে ব্যবহৃত হয় Celery-তে) ইনস্টল করতে হবে।

pip install fastapi uvicorn celery[redis] redis
  • FastAPI: API তৈরি করতে।
  • Celery: Task Queue ম্যানেজ করতে।
  • Redis: Celery ব্রোকার হিসেবে ব্যবহৃত হবে।

Step 2: Celery কনফিগারেশন

Celery ব্যবহার করার জন্য, আপনাকে প্রথমে একটি Celery অ্যাপ তৈরি করতে হবে এবং সেটি FastAPI অ্যাপের সঙ্গে সংযুক্ত করতে হবে। আমরা Redis ব্রোকার ব্যবহার করব।

উদাহরণ: Celery কনফিগারেশন

# celery_worker.py

from celery import Celery

# Celery অ্যাপ তৈরি
app = Celery("worker", broker="redis://localhost:6379/0")

# Task ফাংশন
@app.task
def send_email(email: str):
    # ইমেইল পাঠানোর লজিক এখানে থাকবে
    return f"Email sent to {email}"

এখানে:

  • Celery অ্যাপ তৈরি করা হয়েছে যা Redis ব্রোকার ব্যবহার করছে।
  • send_email একটি অ্যাসিনক্রোনাস টাস্ক যা ইমেইল পাঠানোর জন্য ব্যবহৃত হবে।

Step 3: FastAPI অ্যাপ তৈরি করা

এখন আমরা FastAPI অ্যাপ তৈরি করব যেখানে Celery এর টাস্কগুলিকে কল করা হবে।

উদাহরণ: FastAPI অ্যাপ

# main.py

from fastapi import FastAPI
from celery_worker import send_email  # Celery থেকে টাস্ক ইমপোর্ট

app = FastAPI()

@app.post("/send-email/")
async def send_email_task(email: str):
    # Celery টাস্ককে ব্যাকগ্রাউন্ডে রান করা
    send_email.apply_async(args=[email])
    return {"message": "Email task has been initiated"}

এখানে:

  • FastAPI অ্যাপ তৈরি করা হয়েছে।
  • send_email টাস্ক apply_async এর মাধ্যমে ব্যাকগ্রাউন্ডে চালানোর জন্য কল করা হয়েছে।

Step 4: Redis সার্ভার চালানো

Redis একটি ইন-মেমরি ডেটা স্টোর যা Celery টাস্ক ব্রোকার হিসেবে কাজ করবে। Redis চালানোর জন্য নিচের কমান্ডটি ব্যবহার করুন:

redis-server

Redis সার্ভার চালু থাকা উচিত যাতে Celery টাস্ক ব্রোকার হিসেবে কাজ করতে পারে।


Step 5: Celery Worker চালানো

এখন Celery Worker চালানোর সময় এসেছে, যাতে Celery টাস্কগুলি প্রসেস করতে পারে।

celery -A celery_worker.app worker --loglevel=info

এখানে:

  • celery_worker.app: Celery অ্যাপের অবস্থান।
  • worker: Celery Worker চালানোর জন্য।
  • --loglevel=info: লগ লেভেল নির্ধারণ করা।

Step 6: FastAPI অ্যাপ চালানো

FastAPI অ্যাপ চালানোর জন্য নিচের কমান্ডটি ব্যবহার করুন:

uvicorn main:app --reload

এখানে, main হল আপনার FastAPI ফাইলের নাম এবং app হল FastAPI অ্যাপের অবজেক্ট।


Step 7: API কল এবং Task Execution

এখন আপনি POST রিকোয়েস্ট পাঠিয়ে /send-email/ এন্ডপয়েন্টে ইমেইল পাঠানোর টাস্ক ট্রিগার করতে পারবেন।

রিকোয়েস্ট উদাহরণ:

curl -X 'POST' \
  'http://127.0.0.1:8000/send-email/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{"email": "user@example.com"}'

রেসপন্স:

{
  "message": "Email task has been initiated"
}

এখানে, send_email টাস্কটি Celery Worker দ্বারা ব্যাকগ্রাউন্ডে কার্যকর হবে এবং আপনি ইমেইল পাঠানোর প্রক্রিয়া চালু করতে পারবেন।


Step 8: Celery Worker টাস্ক মনিটর করা

Celery Worker টাস্কের কার্যক্রম মনিটর করতে, আপনি লগ দেখতে পারেন, যা Redis থেকে টাস্কগুলি গ্রহণ এবং প্রক্রিয়া করছে।

উদাহরণ: Worker টাস্ক লেভেল লগ:

[2024-12-13 14:38:02,111: INFO/MainProcess] Task celery_worker.send_email[6c49d5ee-d9b1-4c55-b457-9e78e1ba18d9] succeeded in 0.45s: 'Email sent to user@example.com'

এটি নিশ্চিত করে যে, Celery Worker সফলভাবে email task সম্পন্ন করেছে।


FastAPI এবং Celery এর ইন্টিগ্রেশন (Task Queue) ব্যবহার করে আপনি asynchronous টাস্কগুলি background processing করতে পারবেন। Celery এর মাধ্যমে আপনি দীর্ঘ-running এবং resource-intensive কাজগুলিকে ব্যাকগ্রাউন্ডে প্রক্রিয়া করতে পারবেন, যেমন ইমেইল পাঠানো, ভিডিও প্রসেসিং, ডেটা বিশ্লেষণ ইত্যাদি। Redis ব্রোকার ব্যবহার করে Celery-কে FastAPI অ্যাপের সাথে সংযুক্ত করা খুবই সহজ এবং কার্যকর।

Content added By

Celery কী এবং কীভাবে কাজ করে?

199

Celery হলো একটি Asynchronous Task Queue বা Distributed Task Queue যা Python-ভিত্তিক এবং ডিজাইন করা হয়েছে দীর্ঘ-running বা ব্যাকগ্রাউন্ড টাস্কগুলি প্রক্রিয়া করার জন্য। এটি মূলত কাজের লোডকে সামাল দেওয়ার জন্য ব্যবহৃত হয়, যেখানে কিছু কাজ পেছনে চলে যেতে পারে, যেমন ইমেইল পাঠানো, ফাইল প্রসেসিং, ডেটা বিশ্লেষণ ইত্যাদি। Celery কাজ করার জন্য message brokers (যেমন RabbitMQ, Redis) ব্যবহার করে।

FastAPI এর সাথে Celery ইন্টিগ্রেট করা গেলে, আমরা Async Task বা Background Task অ্যাসিনক্রোনাসভাবে চালাতে পারি, এবং মূল অ্যাপের রেসপন্স দ্রুত ফিরে আসতে সাহায্য করতে পারি।


Celery কী?

Celery হলো একটি টাস্ক কিউ (Task Queue) সিস্টেম যা মূলত কিছু কাজকে ব্যাকগ্রাউন্ডে প্রক্রিয়া করার জন্য ব্যবহৃত হয়। এটি মূল অ্যাপ্লিকেশনের সাথে সমান্তরালে কাজ করে এবং task বা job গুলোকে পরবর্তী সময়ে সঞ্চালন করতে সহায়তা করে।

Celery সাধারণত message brokers (যেমন RabbitMQ, Redis) ব্যবহার করে কাজের অর্ডার বা কাজের বিস্তারিত বার্তা (task) পাঠায় এবং workers সেই কাজগুলো প্রসেস করে। এটি asynchronous এবং distributed কাজ করার জন্য উপযুক্ত, যার মাধ্যমে ভারী বা দীর্ঘ-running কাজগুলি প্রক্রিয়া করা সম্ভব হয়।


Celery এবং FastAPI এর Integration

FastAPI এবং Celery ইন্টিগ্রেট করার মাধ্যমে আপনি Asynchronous Task Queue পরিচালনা করতে পারেন, যার মাধ্যমে আপনার FastAPI অ্যাপ্লিকেশন ব্যাকগ্রাউন্ডে বড় ও সময়সাপেক্ষ কাজ করতে পারে।


Celery কীভাবে কাজ করে?

Celery কাজ করার জন্য নিচের প্রধান উপাদানগুলির প্রয়োজন:

  1. Task Queue - একটি কিউ যেখানে টাস্কগুলি রাখা হয়। সাধারণত, Redis বা RabbitMQ ব্যবহৃত হয়।
  2. Workers - যারা কিউ থেকে টাস্ক গ্রহণ করে এবং তা প্রক্রিয়া করে।
  3. Brokers - যারা কিউ এবং টাস্কগুলোর মধ্যে মেসেজ ট্রান্সফার করে (যেমন Redis বা RabbitMQ)।

Celery এর কাজের ধাপ:

  1. Task Creation: একটি অ্যাপ্লিকেশন Celery টাস্ক তৈরি করে। এই টাস্কটি কাজটি এক্সিকিউট করার জন্য একটি ফাংশন হতে পারে।
  2. Task Submission: টাস্কটি Celery কিউতে জমা দেওয়া হয়।
  3. Task Execution by Worker: Celery Worker কিউ থেকে টাস্ক নেয় এবং প্রক্রিয়া শুরু করে।
  4. Task Completion: টাস্ক সম্পন্ন হলে, Celery Worker তা রিপোর্ট বা রেসপন্স প্রেরণ করতে পারে।

FastAPI এবং Celery সেটআপ করা

FastAPI এবং Celery ইন্টিগ্রেট করতে নিচের ধাপগুলো অনুসরণ করা হবে:


Step 1: প্রয়োজনীয় প্যাকেজ ইনস্টল করা

প্রথমে আপনাকে FastAPI, Celery, এবং Redis প্যাকেজ ইনস্টল করতে হবে। আমরা Redis ব্যবহার করব ব্রোকার হিসেবে।

pip install fastapi uvicorn celery redis

Step 2: Celery কনফিগারেশন

Celery কনফিগারেশন এবং টাস্ক তৈরি করা:

উদাহরণ: celery_worker.py

from celery import Celery

# Celery কনফিগারেশন
app = Celery('worker', broker='redis://localhost:6379/0')

@app.task
def send_email(email: str):
    # Email পাঠানোর কাজ (এটি একটি উদাহরণ, বাস্তব কাজ হবে)
    print(f"Sending email to {email}")
    return f"Email sent to {email}"

এখানে, send_email একটি সিম্পল টাস্ক যা একটি ইমেইল ঠিকানায় ইমেইল পাঠানোর উদাহরণ হিসেবে দেখানো হয়েছে। এই টাস্কটি ব্যাকগ্রাউন্ডে চলবে।

Celery worker চালানো:

celery -A celery_worker.app worker --loglevel=info

এই কমান্ডটি Celery Worker চালু করবে, যা Redis ব্রোকার থেকে টাস্ক গ্রহণ করবে এবং প্রসেস করবে।


Step 3: FastAPI কনফিগারেশন

এখন FastAPI অ্যাপ্লিকেশন তৈরি করতে হবে যা এই Celery টাস্কগুলি কার্যকর করবে।

উদাহরণ: main.py

from fastapi import FastAPI
from celery_worker import send_email

app = FastAPI()

@app.post("/send-email/")
async def create_email_task(email: str):
    # Celery টাস্ক কল করা
    send_email.apply_async(args=[email])
    return {"message": f"Email task for {email} is being processed in the background"}

এখানে, /send-email/ রাউটটি একটি POST রিকোয়েস্ট গ্রহণ করে, ইমেইল পাঠানোর জন্য Celery টাস্ককে apply_async() ব্যবহার করে কল করবে।


Step 4: Uvicorn দিয়ে FastAPI অ্যাপ চালানো

FastAPI অ্যাপ চালানোর জন্য Uvicorn ব্যবহার করা হয়।

uvicorn main:app --reload

এটি FastAPI অ্যাপ চালু করবে।


Step 5: টেস্ট রিকোয়েস্ট

এখন আপনি একটি POST রিকোয়েস্ট পাঠাতে পারেন যা Celery টাস্ককে ট্রিগার করবে।

POST রিকোয়েস্ট:

URL: /send-email/
Body:

{
  "email": "user@example.com"
}

রেসপন্স:

{
  "message": "Email task for user@example.com is being processed in the background"
}

এখানে, Celery worker ব্যাকগ্রাউন্ডে ইমেইল পাঠানোর কাজ করবে এবং তা আপনার মূল অ্যাপ্লিকেশন থেকে পৃথকভাবে প্রসেস হবে।


Step 6: Celery Worker এর লগ চেক করা

Celery Worker-এ send_email টাস্কটি প্রসেস হচ্ছে কিনা তা দেখতে, celery_worker.py চালু রাখুন এবং Redis এর লগ দেখুন। যখন আপনি POST রিকোয়েস্ট পাঠাবেন, তখন Celery Worker টাস্কটি গ্রহণ করবে এবং সম্পন্ন করবে।


Celery এবং FastAPI এর ইন্টিগ্রেশন ব্যাকগ্রাউন্ডে লম্বা-running বা সময়সাপেক্ষ কাজ করার জন্য উপযুক্ত। Task Queue এর মাধ্যমে FastAPI অ্যাপ্লিকেশনের পারফরম্যান্স উন্নত করা যায় এবং ব্যবহারকারীকে দ্রুত রেসপন্স প্রদান করা সম্ভব হয়। Celery এর মাধ্যমে আপনি ইমেইল পাঠানো, ফাইল প্রসেসিং, ডেটা বিশ্লেষণ ইত্যাদি কাজগুলিকে ব্যাকগ্রাউন্ডে প্রসেস করতে পারেন, যা অ্যাপ্লিকেশনের কার্যক্ষমতা এবং স্কেলেবিলিটি বৃদ্ধি করে।

Content added By

FastAPI এবং Celery এর ইন্টিগ্রেশন

206

FastAPI এবং Celery এর ইন্টিগ্রেশন খুবই কার্যকরী যখন আপনি অ্যাসিঙ্ক্রোনাস টাস্ক পরিচালনা করতে চান, যেমন ব্যাকগ্রাউন্ডে বড় বা সময়সাপেক্ষ কাজগুলো (যেমন ইমেইল পাঠানো, ডাটা প্রসেসিং ইত্যাদি)। Celery একটি অ্যাসিঙ্ক্রোনাস টাস্ক কিউ সিস্টেম যা এই ধরনের কাজগুলো খুবই দ্রুত এবং কার্যকরভাবে পরিচালনা করতে সাহায্য করে।

এই টিউটোরিয়ালে, আমরা FastAPI এবং Celery এর ইন্টিগ্রেশন কিভাবে করবেন, তা বিস্তারিতভাবে দেখব।


Step 1: প্রয়োজনীয় প্যাকেজ ইনস্টল করা

প্রথমে, আপনাকে Celery এবং Redis (অথবা অন্য কিউ সিস্টেম) ইনস্টল করতে হবে। Redis হল একটি জনপ্রিয় কিউ সিস্টেম যা Celery এর সাথে ব্যবহার করা হয়।

pip install celery[redis]
pip install fastapi
pip install uvicorn

এখানে, celery[redis] দিয়ে আমরা Celery এবং Redis সাপোর্টেড প্যাকেজ ইনস্টল করছি।


Step 2: Celery কনফিগারেশন

Celery এর সাথে কনফিগারেশন করতে Celery ইনস্ট্যান্স তৈরি করতে হবে এবং কিউ হিসেবে Redis ব্যবহার করা হবে।

উদাহরণ: Celery কনফিগারেশন

# celery_worker.py

from celery import Celery

# Redis কনফিগারেশন
celery_app = Celery(
    'worker',
    broker='redis://localhost:6379/0',  # Redis কিউ সার্ভার URL
    backend='redis://localhost:6379/0', # Redis ব্যাকএন্ড
)

# টাস্ক ডিফাইন করা
@celery_app.task
def add_numbers(a, b):
    return a + b

এখানে, celery_app হল আমাদের Celery অ্যাপ্লিকেশন যা Redis কিউ সার্ভার ব্যবহার করবে। add_numbers হল একটি সাধারণ টাস্ক যা দুইটি সংখ্যার যোগফল বের করে।


Step 3: FastAPI অ্যাপ তৈরি করা

এখন, আমরা FastAPI অ্যাপ তৈরি করব যা Celery টাস্কগুলোকে কল করবে।

উদাহরণ: FastAPI অ্যাপ

from fastapi import FastAPI
from celery_worker import add_numbers  # Celery টাস্ক আমদানি করা

app = FastAPI()

@app.get("/")
async def read_root():
    return {"message": "FastAPI and Celery Integration"}

@app.get("/add/{a}/{b}")
async def add(a: int, b: int):
    # Celery টাস্ক কল করা
    task = add_numbers.apply_async(args=[a, b])
    return {"task_id": task.id, "status": task.status}

এখানে:

  • /add/{a}/{b} রাউটে দুইটি ইনপুট সংখ্যা পাঠানো হচ্ছে।
  • add_numbers.apply_async() ব্যবহার করে Celery টাস্ক a এবং b সংখ্যার যোগফল বের করার জন্য ব্যাকগ্রাউন্ডে চালানো হবে।

Step 4: Celery Worker চালানো

এখন, আপনাকে Celery Worker চালাতে হবে যেটি Redis কিউ থেকে টাস্ক গ্রহণ করবে এবং সেগুলো প্রসেস করবে।

celery -A celery_worker.celery_app worker --loglevel=info

এখানে, celery_worker.celery_app হল Celery অ্যাপের পাথ এবং worker হচ্ছে Celery Worker, যা Redis কিউ থেকে টাস্ক গ্রহণ করবে এবং একে একে সেগুলো সম্পন্ন করবে।


Step 5: FastAPI অ্যাপ চালানো

এখন FastAPI অ্যাপ চালানোর জন্য uvicorn ব্যবহার করুন।

uvicorn main:app --reload

এখানে, main:app হল FastAPI অ্যাপের পাথ, এবং --reload ফ্ল্যাগ ডেভেলপমেন্ট পরিবেশে অটোমেটিক রিলোড করার জন্য ব্যবহৃত হয়।


Step 6: টাস্ক ফিল্ড চেক করা

এখন, আপনি যদি FastAPI এ /add/{a}/{b} এন্ডপয়েন্টে GET রিকোয়েস্ট পাঠান, তবে Celery টাস্ক চলে যাবে এবং একটি টাস্ক আইডি ফেরত পাওয়া যাবে।

উদাহরণ:

GET /add/5/10

রেসপন্স:

{
  "task_id": "some-task-id",
  "status": "PENDING"
}

Redis কিউয়ে Celery Worker টাস্কটি গ্রহণ করবে এবং সম্পন্ন করার পর স্ট্যাটাস আপডেট হবে।

রেসপন্স (টাস্ক সম্পন্ন হওয়ার পর):

{
  "task_id": "some-task-id",
  "status": "SUCCESS",
  "result": 15
}

Step 7: Celery টাস্কের ফলাফল চেক করা

আপনি যদি টাস্কের ফলাফল চেক করতে চান, তবে AsyncResult ব্যবহার করতে পারেন। যেমন:

from celery.result import AsyncResult

@app.get("/result/{task_id}")
async def get_result(task_id: str):
    task_result = AsyncResult(task_id)
    if task_result.state == "SUCCESS":
        return {"task_id": task_id, "result": task_result.result}
    return {"task_id": task_id, "status": task_result.state}

এখানে AsyncResult টাস্কের স্ট্যাটাস এবং ফলাফল জানতে ব্যবহৃত হবে।


Step 8: টাস্ক ফ্লেভার কনফিগারেশন এবং অপটিমাইজেশন

Celery টাস্কে কাস্টম টাইমআউট:

Celery টাস্কের জন্য আপনি টাইমআউট কনফিগার করতে পারেন, যাতে কোনো টাস্ক অত্যধিক সময় নিলে তা বন্ধ হয়ে যায়।

celery_app.conf.update(
    task_time_limit=300,  # টাস্কের জন্য সর্বোচ্চ সময় (৫ মিনিট)
)

Celery-তে ব্যাকগ্রাউন্ড টাস্ক

FastAPI এবং Celery-তে ব্যাকগ্রাউন্ড টাস্কগুলি চালানোর জন্য apply_async() ব্যবহার করা হয়, যা টাস্ককে অবিলম্বে অথবা নির্দিষ্ট সময় পর চালানোর জন্য ব্যবহৃত হয়।


FastAPI এবং Celery এর ইন্টিগ্রেশন দিয়ে আপনি আপনার অ্যাপ্লিকেশনকে asynchronous এবং scalable করতে পারেন। এই ইন্টিগ্রেশন ব্যাকগ্রাউন্ড টাস্ক ম্যানেজমেন্টের জন্য খুবই উপকারী, বিশেষত যখন আপনাকে বড় কাজগুলো (যেমন ইমেইল পাঠানো, ডাটা প্রসেসিং) অ্যাসিঙ্ক্রোনাসভাবে করতে হয়। Celery এর মাধ্যমে আপনি আপনার অ্যাপের কার্যক্ষমতা ও প্রতিক্রিয়া সময়কে উন্নত করতে পারবেন।

Content added By

Background Tasks এবং Distributed Task Queue

204

FastAPI তে Background Tasks এবং Distributed Task Queue ব্যবস্থাপনা একটি গুরুত্বপূর্ণ ফিচার যা অ্যাপ্লিকেশনের পারফরম্যান্স এবং স্কেলেবিলিটি বাড়ানোর জন্য ব্যবহৃত হয়। যখন আপনার অ্যাপ্লিকেশনকে ব্যাকগ্রাউন্ডে কাজ চালানোর জন্য সক্ষম করতে হয় বা long-running tasks পরিচালনা করতে হয়, তখন Background Tasks এবং Distributed Task Queue ব্যবহৃত হয়।

এখানে আমরা FastAPI এবং Celery এর ইন্টিগ্রেশন এবং ব্যাকগ্রাউন্ড টাস্ক ব্যবস্থাপনা কিভাবে করা যায় তা দেখব।


1. Background Tasks in FastAPI

FastAPI তে Background Tasks ব্যবহারের মাধ্যমে আপনি এক্সপেন্সিভ বা টাইম-কনজিউমিং কাজগুলো ব্যাকগ্রাউন্ডে চালাতে পারেন, যাতে এটি প্রধান HTTP রিকোয়েস্ট প্রসেসিং এর সময় বিলম্ব না ঘটায়।

উদাহরণ: Background Tasks

FastAPI তে BackgroundTask ব্যবহার করার জন্য BackgroundTasks ক্লাসটি ইনজেক্ট করতে হয়, এবং এতে এক বা একাধিক টাস্ক ব্যাকগ্রাউন্ডে চালানো হয়।

from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

# ব্যাকগ্রাউন্ড ফাংশন
def write_log(message: str):
    time.sleep(5)
    with open("log.txt", mode="a") as log:
        log.write(message + "\n")

# এন্ডপয়েন্ট যেখানে ব্যাকগ্রাউন্ড টাস্ক ব্যবহৃত হবে
@app.get("/send-email/")
async def send_email(background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, "Email sent successfully!")
    return {"message": "Email has been sent in the background."}

ব্যাখ্যা:

  • BackgroundTasks: এটি FastAPI এর এক্সটেনশন যা ব্যাকগ্রাউন্ড টাস্ক পরিচালনা করতে সাহায্য করে।
  • add_task(): ব্যাকগ্রাউন্ডে যেকোনো ফাংশন (এখানে write_log) চালানোর জন্য এটি ব্যবহার করা হয়।

রিকোয়েস্ট উদাহরণ:

GET /send-email/

রেসপন্স:

{
  "message": "Email has been sent in the background."
}

এখানে, write_log ফাংশনটি ব্যাকগ্রাউন্ডে কাজ করবে, এবং ব্যবহারকারী রেসপন্স পাবে কিন্তু কাজটি সম্পন্ন হতে কিছু সময় নিবে।


2. Celery এর মাধ্যমে Distributed Task Queue

Celery একটি distributed task queue ব্যবস্থা যা FastAPI এর সাথে খুব সহজেই ইন্টিগ্রেট করা যায়। Celery ব্যবহার করে আপনি অ্যাসিঙ্ক্রোনাস বা ব্যাকগ্রাউন্ড টাস্ক পরিচালনা করতে পারেন, যা স্কেলেবল এবং ডিস্ট্রিবিউটেড।

Step 1: Celery ইনস্টল করা

Celery এবং Redis বা RabbitMQ (message broker) ইনস্টল করতে হবে।

pip install celery redis

এখানে, আমরা Redis ব্যবহার করব যেহেতু এটি জনপ্রিয় message broker হিসেবে ব্যবহৃত হয়।

Step 2: Celery অ্যাপ কনফিগারেশন

FastAPI অ্যাপ্লিকেশনে Celery যোগ করার জন্য আপনাকে একটি Celery অ্যাপ কনফিগার করতে হবে।

# celery_app.py
from celery import Celery

# Redis broker ব্যবহার করে Celery অ্যাপ তৈরি
celery_app = Celery('tasks', broker='redis://localhost:6379/0')

@celery_app.task
def send_email_task(email: str):
    # দীর্ঘ সময়ের কাজ যেমন ইমেইল পাঠানো
    time.sleep(5)
    return f"Email sent to {email}"

এখানে:

  • Celery('tasks', broker='redis://localhost:6379/0'): Celery অ্যাপ এবং Redis ব্রোকার কনফিগার করা হচ্ছে।
  • send_email_task: একটি Celery টাস্ক যা ব্যাকগ্রাউন্ডে ইমেইল পাঠানোর মতো কাজ করবে।

Step 3: FastAPI তে Celery ইন্টিগ্রেট করা

from fastapi import FastAPI
from celery_app import send_email_task

app = FastAPI()

@app.get("/send-email/{email}")
async def send_email(email: str):
    send_email_task.delay(email)  # Celery টাস্কে কল করা
    return {"message": "Email is being sent in the background."}

ব্যাখ্যা:

  • send_email_task.delay(email): এটি Celery টাস্ককে asynchronous ভাবে রান করতে সাহায্য করে।
  • delay(): Celery টাস্ক রান করার জন্য এটি asynchronous কল ব্যবহার করা হয়।

Step 4: Celery Worker রান করা

Celery worker রান করার জন্য আপনাকে আলাদা কমান্ডে Celery চালাতে হবে:

celery -A celery_app.celery_app worker --loglevel=info

এটি Redis সার্ভারের মাধ্যমে কাজ করবে এবং বিভিন্ন টাস্ক প্রসেস করবে।

Step 5: FastAPI অ্যাপ চালানো

FastAPI অ্যাপ চালানোর জন্য:

uvicorn main:app --reload

রিকোয়েস্ট উদাহরণ:

GET /send-email/test@example.com

রেসপন্স:

{
  "message": "Email is being sent in the background."
}

এই কমান্ডে, send_email_task টাস্কটি Redis তে প্রেরিত হবে এবং Celery worker তা প্রসেস করবে, এদিকে FastAPI অ্যাপ্লিকেশন ক্লায়েন্টকে দ্রুত রেসপন্স প্রদান করবে।


3. Celery এবং FastAPI তে Error Handling এবং Retries

Celery তে টাস্কের জন্য error handling এবং retries সেটআপ করা যায়। উদাহরণস্বরূপ:

@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, email: str):
    try:
        # ইমেইল পাঠানোর কাজ
        # simulate error
        if email == "error@example.com":
            raise ValueError("Simulated error")
        return f"Email sent to {email}"
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)

এখানে:

  • max_retries=3: টাস্কটি সর্বাধিক ৩ বার পুনরায় চেষ্টা করবে।
  • retry(): যদি কোনো ত্রুটি ঘটে, তাহলে টাস্কটি পুনরায় চেষ্টা করবে।

FastAPI তে Background Tasks এবং Celery ব্যবহার করে asynchronous এবং distributed task queue ব্যবস্থা তৈরি করা সহজ এবং কার্যকর। Background Tasks ছোট ও সহজ কাজের জন্য এবং Celery বড় এবং স্কেলেবল অ্যাসিঙ্ক্রোনাস কাজের জন্য ব্যবহৃত হয়। এই সিস্টেম ব্যবহারের মাধ্যমে আপনি অ্যাপ্লিকেশনকে আরও স্কেলেবল এবং পারফরম্যান্ট তৈরি করতে পারেন। Celery সহ ডিস্ট্রিবিউটেড টাস্ক কিউ ব্যবস্থাপনা উন্নত পারফরম্যান্স এবং সুরক্ষা প্রদান করে।

Content added By

Redis বা RabbitMQ ব্যবহার করে Celery Queue ম্যানেজ করা

192

FastAPI এর সাথে Celery এবং Redis বা RabbitMQ ব্যবহার করে আপনার অ্যাপ্লিকেশনে অ্যাসিঙ্ক্রোনাস কাজ (Asynchronous tasks) এবং ব্যাকগ্রাউন্ড টাস্ক পরিচালনা করা সম্ভব। Celery হলো একটি শক্তিশালী টাস্ক কিউ ব্যবস্থাপনা সিস্টেম যা ডিস্ট্রিবিউটেড টাস্ক কিউ সিস্টেমের মাধ্যমে ব্যাকগ্রাউন্ডে চলমান টাস্কগুলো পরিচালনা করে। এখানে আমরা দেখব কিভাবে Redis বা RabbitMQ ব্যবহার করে Celery Queue ম্যানেজ করা যায় এবং এটি FastAPI এর সাথে ইন্টিগ্রেট করা যায়।


Celery এবং Redis/RabbitMQ কেন ব্যবহার করবেন?

  1. ব্যাকগ্রাউন্ড টাস্ক: Web request-response চক্রের বাইরে লম্বা-running কাজ (যেমন ইমেইল পাঠানো, ডাটাবেস ব্যাচ প্রসেসিং) করা যায়।
  2. স্কেলেবিলিটি: Redis বা RabbitMQ সার্ভার ব্যবহার করে টাস্কগুলো ডিস্ট্রিবিউটেড সিস্টেমে স্কেল করা যায়, যাতে লোড বেশি হওয়ায় কাজের ধীরগতি না হয়।
  3. অ্যাসিঙ্ক্রোনাস এক্সিকিউশন: CPU-intensive কাজকে অ্যাসিঙ্ক্রোনাসভাবে প্রসেস করতে সক্ষম।

প্রয়োজনীয় প্যাকেজ ইনস্টলেশন

Step 1: Celery, Redis বা RabbitMQ ইনস্টল করুন

আপনার প্রজেক্টের জন্য নিচের প্যাকেজগুলো ইনস্টল করুন:

pip install fastapi celery redis

RabbitMQ ব্যবহারের জন্য:

pip install celery[redis]

Step 2: Redis বা RabbitMQ সার্ভার সেটআপ

  • Redis: আপনি Redis এর অফিশিয়াল সাইট থেকে Redis সার্ভার ইনস্টল করতে পারেন বা ডকার ব্যবহার করতে পারেন।
# Redis ইনস্টল
sudo apt-get install redis-server
  • RabbitMQ: RabbitMQ ইনস্টল করতে:
sudo apt-get install rabbitmq-server

Celery সেটআপ FastAPI এর সাথে

Step 3: Celery কনফিগারেশন

Celery Configuration ফাইল তৈরি করা হবে এবং এখানে Redis বা RabbitMQ কনফিগারেশন থাকবে।

উদাহরণ: Celery সেটআপ Redis এর সাথে

# celery_worker.py
from celery import Celery

# Redis কনফিগারেশন
app = Celery('worker', broker='redis://localhost:6379/0')

# কাজের ফাংশন
@app.task
def send_email(email: str):
    # ইমেইল পাঠানোর কাস্টম লজিক
    print(f"Sending email to {email}")
    return f"Email sent to {email}"

এখানে, Redis ব্যবহৃত হচ্ছে টাস্ক কিউ (broker) হিসেবে। আপনার Redis সার্ভার যদি অন্য মেশিনে থাকে তবে আপনাকে broker='redis://your_redis_host:6379/0' উল্লেখ করতে হবে।

Step 4: FastAPI অ্যাপ্লিকেশন সেটআপ

# main.py
from fastapi import FastAPI
from celery_worker import send_email

app = FastAPI()

@app.post("/send-email/{email}")
async def send_email_task(email: str):
    send_email.delay(email)
    return {"message": f"Email task for {email} has been accepted"}

এখানে, send_email.delay() ব্যবহার করা হয়েছে যাতে টাস্কটি অ্যাসিঙ্ক্রোনাসভাবে কল হয় এবং তা Celery কিউতে চলে যাবে।


Step 5: Celery Worker চালানো

Celery worker রান করার জন্য এই কমান্ডটি ব্যবহার করুন:

celery -A celery_worker.app worker --loglevel=info

এটি Celery worker শুরু করবে এবং টাস্কগুলো কিউ থেকে প্রসেস করবে।


Step 6: FastAPI অ্যাপ চালানো

FastAPI অ্যাপ চালাতে:

uvicorn main:app --reload

এখানে main ফাইলের নাম এবং app FastAPI অ্যাপের অবজেক্ট।


উদাহরণ: RabbitMQ ব্যবহার

RabbitMQ কনফিগারেশন একে অপরের থেকে কিছুটা আলাদা হলেও, তাতে অনেক ক্ষেত্রেই একই প্রক্রিয়া অনুসরণ করা হয়। এখানে, Celery কনফিগারেশন RabbitMQ কে ব্রোকার হিসেবে ব্যবহার করবে।

# celery_worker.py
from celery import Celery

# RabbitMQ কনফিগারেশন
app = Celery('worker', broker='pyamqp://guest@localhost//')

@app.task
def process_task(data: str):
    print(f"Processing: {data}")
    return f"Task processed: {data}"

এখানে pyamqp://guest@localhost// RabbitMQ ব্রোকার কনফিগারেশন, যা RabbitMQ সার্ভারের মাধ্যমে কাজ করবে।


Task Queue এবং Worker Scaling

FastAPI এবং Celery এর মধ্যে টাস্ক কিউ ব্যবস্থাপনা সম্ভবত একটি ডিস্ট্রিবিউটেড সিস্টেমের জন্য ব্যবহৃত হয়। বিভিন্ন workers দিয়ে টাস্কগুলো স্কেল করা সম্ভব। একাধিক worker ব্যবহার করে Celery কে স্কেল করা যেতে পারে, যাতে উচ্চ লোডের সময় কাজের গতিকে বজায় রাখা যায়।

Celery worker-দের স্কেল করার জন্য:

celery -A celery_worker.app worker --loglevel=info --concurrency=4

এখানে --concurrency=4 প্যারামিটারটি worker-কে ৪টি থ্রেডের মাধ্যমে কাজ করতে বলছে।


FastAPI এবং Celery এর মাধ্যমে আপনি অ্যাসিঙ্ক্রোনাস এবং ব্যাকগ্রাউন্ড টাস্ক সিস্টেম তৈরি করতে পারেন, যা Redis বা RabbitMQ ব্রোকার ব্যবহার করে স্কেলেবল এবং কার্যকর টাস্ক কিউ ম্যানেজমেন্ট সরবরাহ করে। এটি দীর্ঘ-running কাজগুলো যেমন ইমেইল পাঠানো, ফাইল প্রসেসিং, ডাটাবেস ব্যাচ প্রসেসিং ইত্যাদি পরিচালনা করার জন্য অত্যন্ত উপকারী।

Content added By
Promotion
NEW SATT AI এখন আপনাকে সাহায্য করতে পারে।

Are you sure to start over?

Loading...